/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.seatunnel.e2e.connector.file.local;

import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
import org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy;
import org.apache.seatunnel.connectors.seatunnel.file.local.catalog.LocalFileCatalog;
import org.apache.seatunnel.connectors.seatunnel.file.local.config.LocalFileHadoopConf;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.container.TestContainerId;
import org.apache.seatunnel.e2e.common.container.TestHelper;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
import org.apache.seatunnel.e2e.common.util.ContainerUtil;

import org.apache.commons.lang3.StringUtils;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.TestTemplate;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.shaded.com.github.dockerjava.core.command.ExecStartResultCallback;

import com.github.dockerjava.api.command.ExecCreateCmdResponse;
import io.airlift.compress.lzo.LzopCodec;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;

@DisabledOnContainer(
        value = {TestContainerId.SPARK_2_4},
        type = {},
        disabledReason = "The apache-compress version is not compatible with apache-poi")
@Slf4j
public class LocalFileIT extends TestSuiteBase {

    private GenericContainer<?> baseContainer;

    /** Copy data files to container */
    @TestContainerExtension
    private final ContainerExtendedFactory extendedFactory =
            container -> {
                this.baseContainer = container;
                ContainerUtil.copyFileIntoContainers(
                        "/json/e2e.json",
                        "/seatunnel/read/json/name=tyrantlucifer/hobby=coding/e2e.json",
                        container);

                ContainerUtil.copyFileIntoContainers(
                        "/json/e2e_gbk.json",
                        "/seatunnel/read/encoding/json/e2e_gbk.json",
                        container);

                Path jsonLzo = convertToLzoFile(ContainerUtil.getResourcesFile("/json/e2e.json"));
                ContainerUtil.copyFileIntoContainers(
                        jsonLzo, "/seatunnel/read/lzo_json/e2e.json", container);

                ContainerUtil.copyFileIntoContainers(
                        "/text/e2e.txt",
                        "/seatunnel/read/text/name=tyrantlucifer/hobby=coding/e2e.txt",
                        container);

                ContainerUtil.copyFileIntoContainers(
                        "/text/e2e_gbk.txt",
                        "/seatunnel/read/encoding/text/e2e_gbk.txt",
                        container);

                ContainerUtil.copyFileIntoContainers(
                        "/text/e2e_delimiter.txt",
                        "/seatunnel/read/text_delimiter/e2e.txt",
                        container);

                ContainerUtil.copyFileIntoContainers(
                        "/text/e2e_time_format.txt",
                        "/seatunnel/read/text_time_format/e2e.txt",
                        container);

                Path txtLzo = convertToLzoFile(ContainerUtil.getResourcesFile("/text/e2e.txt"));
                ContainerUtil.copyFileIntoContainers(
                        txtLzo, "/seatunnel/read/lzo_text/e2e.txt", container);
                ContainerUtil.copyFileIntoContainers(
                        "/excel/e2e.xlsx",
                        "/seatunnel/read/excel/name=tyrantlucifer/hobby=coding/e2e.xlsx",
                        container);
                ContainerUtil.copyFileIntoContainers(
                        "/excel/e2e.xls",
                        "/seatunnel/read/excel/name=tyrantlucifer/hobby=coding/e2e.xls",
                        container);

                ContainerUtil.copyFileIntoContainers(
                        "/orc/e2e.orc",
                        "/seatunnel/read/orc/name=tyrantlucifer/hobby=coding/e2e.orc",
                        container);

                ContainerUtil.copyFileIntoContainers(
                        "/orc/orc_for_cast.orc", "/seatunnel/read/orc_cast/e2e.orc", container);

                ContainerUtil.copyFileIntoContainers(
                        "/parquet/e2e.parquet",
                        "/seatunnel/read/parquet/name=tyrantlucifer/hobby=coding/e2e.parquet",
                        container);

                ContainerUtil.copyFileIntoContainers(
                        "/binary/cat.png", "/seatunnel/read/binary/cat.png", container);

                ContainerUtil.copyFileIntoContainers(
                        "/excel/e2e.xlsx",
                        "/seatunnel/read/excel_filter/name=tyrantlucifer/hobby=coding/e2e_filter.xlsx",
                        container);
                container.execInContainer("mkdir", "-p", "/tmp/fake_empty");
            };

    @TestTemplate
    public void testLocalFileReadAndWrite(TestContainer container)
            throws IOException, InterruptedException {
        TestHelper helper = new TestHelper(container);
        helper.execute("/excel/fake_to_local_excel.conf");
        helper.execute("/excel/local_excel_to_assert.conf");
        helper.execute("/excel/local_excel_projection_to_assert.conf");
        // test write local text file
        helper.execute("/text/fake_to_local_file_text.conf");
        helper.execute("/text/local_file_text_lzo_to_assert.conf");
        helper.execute("/text/local_file_delimiter_assert.conf");
        helper.execute("/text/local_file_time_format_assert.conf");
        // test read skip header
        helper.execute("/text/local_file_text_skip_headers.conf");
        // test read local text file
        helper.execute("/text/local_file_text_to_assert.conf");
        // test read local text file with projection
        helper.execute("/text/local_file_text_projection_to_assert.conf");
        // test read local csv file with assigning encoding
        helper.execute("/text/fake_to_local_file_with_encoding.conf");
        // test read local csv file with assigning encoding
        helper.execute("/text/local_file_text_to_console_with_encoding.conf");

        // test write local json file
        helper.execute("/json/fake_to_local_file_json.conf");
        // test read local json file
        helper.execute("/json/local_file_json_to_assert.conf");
        helper.execute("/json/local_file_json_lzo_to_console.conf");
        // test read local json file with assigning encoding
        helper.execute("/json/fake_to_local_file_json_with_encoding.conf");
        // test write local json file with assigning encoding
        helper.execute("/json/local_file_json_to_console_with_encoding.conf");

        // test write local orc file
        helper.execute("/orc/fake_to_local_file_orc.conf");
        // test read local orc file
        helper.execute("/orc/local_file_orc_to_assert.conf");
        // test read local orc file with projection
        helper.execute("/orc/local_file_orc_projection_to_assert.conf");
        // test read local orc file with projection and type cast
        helper.execute("/orc/local_file_orc_to_assert_with_time_and_cast.conf");
        // test write local parquet file
        helper.execute("/parquet/fake_to_local_file_parquet.conf");
        // test read local parquet file
        helper.execute("/parquet/local_file_parquet_to_assert.conf");
        // test read local parquet file with projection
        helper.execute("/parquet/local_file_parquet_projection_to_assert.conf");
        // test read filtered local file
        helper.execute("/excel/local_filter_excel_to_assert.conf");

        // test read empty directory
        helper.execute("/json/local_file_to_console.conf");
        helper.execute("/parquet/local_file_to_console.conf");

        // test binary file
        helper.execute("/binary/local_file_binary_to_local_file_binary.conf");
        if (!container.identifier().getEngineType().equals(EngineType.FLINK)) {
            // the file generated by local_file_binary_to_local_file_binary in taskManager, so read
            // from jobManager will be failed in Flink
            helper.execute("/binary/local_file_binary_to_assert.conf");
        }
    }

    @TestTemplate
    @DisabledOnContainer(
            value = {TestContainerId.SPARK_2_4},
            type = {EngineType.FLINK},
            disabledReason =
                    "Fink test is multi-node, LocalFile connector will use different containers for obtaining files")
    public void testLocalFileReadAndWriteWithSaveMode(TestContainer container)
            throws IOException, InterruptedException {
        TestHelper helper = new TestHelper(container);
        // test save_mode
        String path = "/tmp/seatunnel/localfile/json/fake";
        Assertions.assertEquals(getFileListFromContainer(path).size(), 0);
        helper.execute("/json/fake_to_local_file_json_save_mode.conf");
        Assertions.assertEquals(getFileListFromContainer(path).size(), 1);
        helper.execute("/json/fake_to_local_file_json_save_mode.conf");
        Assertions.assertEquals(getFileListFromContainer(path).size(), 1);
    }

    @SneakyThrows
    private List<String> getFileListFromContainer(String path) {
        String command = "ls -1 " + path;
        ExecCreateCmdResponse execCreateCmdResponse =
                dockerClient
                        .execCreateCmd(baseContainer.getContainerId())
                        .withCmd("sh", "-c", command)
                        .withAttachStdout(true)
                        .withAttachStderr(true)
                        .exec();

        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
        dockerClient
                .execStartCmd(execCreateCmdResponse.getId())
                .exec(new ExecStartResultCallback(outputStream, System.err))
                .awaitCompletion();

        String output = new String(outputStream.toByteArray(), StandardCharsets.UTF_8).trim();
        List<String> fileList = new ArrayList<>();
        log.info("container path file list is :{}", output);
        String[] files = output.split("\n");
        for (String file : files) {
            if (StringUtils.isNotEmpty(file)) {
                log.info("container path file name is :{}", file);
                fileList.add(file);
            }
        }
        return fileList;
    }

    @TestTemplate
    public void testLocalFileCatalog(TestContainer container)
            throws IOException, InterruptedException {
        final LocalFileCatalog localFileCatalog =
                new LocalFileCatalog(
                        new HadoopFileSystemProxy(new LocalFileHadoopConf()),
                        "/tmp/seatunnel/json/test1",
                        FileSystemType.LOCAL.getFileSystemPluginName());
        final TablePath tablePath = TablePath.DEFAULT;
        Assertions.assertFalse(localFileCatalog.tableExists(tablePath));
        localFileCatalog.createTable(null, null, false);
        Assertions.assertTrue(localFileCatalog.tableExists(tablePath));
        Assertions.assertFalse(localFileCatalog.isExistsData(tablePath));
        localFileCatalog.dropTable(tablePath, false);
        Assertions.assertFalse(localFileCatalog.tableExists(tablePath));
    }

    private Path convertToLzoFile(File file) throws IOException {
        LzopCodec lzo = new LzopCodec();
        Path path = Paths.get(file.getAbsolutePath() + ".lzo");
        OutputStream outputStream = lzo.createOutputStream(Files.newOutputStream(path));
        outputStream.write(Files.readAllBytes(file.toPath()));
        outputStream.close();
        return path;
    }
}
